package com.tdy.cdc.app.dwd;

import com.tdy.cdc.app.BaseSqlApp;
import com.tdy.cdc.common.Constant;
import com.tdy.cdc.util.FlinkSinkUtil;
import com.tdy.cdc.util.FlinkSourceUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author NanHuang
 * @Date 2023/3/14
 */
public class App15_DwdMhsIptStageSummary extends BaseSqlApp {
    public static void main(String[] args) {
        new App15_DwdMhsIptStageSummary().init(
                5015,
                "App15_DwdMhsIptStageSummary",
                2
        );
    }

    @Override
    protected void invoke(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv) {
        // 1 创建源表
        createSourceTables(tableEnv);
        // 2 join
        joinTables(tableEnv);
        // 3 写入Kafka
        writeToKafka(tableEnv);
    }

    private void writeToKafka(StreamTableEnvironment tableEnv) {
        // create kafka mappings
        tableEnv.executeSql("CREATE TABLE target ( "+
                "    RECORD_ID                              STRING     "+
                "    ,HOSPITAL_NAME                         STRING     "+
                "    ,HOSPITAL_CODE                         STRING     "+
                "    ,AGE_UNIT                              STRING     "+
                "    ,AGE                                   STRING     "+
                "    ,SEX_NAME                              STRING     "+
                "    ,SEX_CODE                              STRING     "+
                "    ,ID_CARD_NUMBER                        STRING     "+
                "    ,PATIENT_VISIT_CATEGORY_CODE           STRING     "+
                "    ,PATIENT_VISIT_CATEGORY                STRING     "+
                "    ,INPATIENT_VISIT_FLOW_ID               STRING     "+
                "    ,INPATIENT_NO                          STRING     "+
                "    ,PATIENT_ID                            STRING     "+
                "    ,PATIENT_NAME                          STRING     "+
                "    ,DOMAIN_ID                             STRING     "+
                "    ,VISIT_DATE                            STRING     "+
                "    ,VISIT_TIMES                           STRING     "+
                "    ,BIRTH_DATE                            STRING     "+
                "    ,AUTHOR_CODE                           STRING     "+
                "    ,AUTHOR_NAME                           STRING     "+
                "    ,CUSTODIAN_CODE                        STRING     "+
                "    ,CUSTODIAN_NAME                        STRING     "+
                "    ,ELEC_SIGN_DATETIME                    STRING     "+
                "    ,DOCTOR_ID                             STRING     "+
                "    ,DOCTOR_SIGN_NAME                      STRING     "+
                "    ,BRIEF_SUMMARY_DATETIME                STRING     "+
                "    ,ADMIT_DATETIME                        STRING     "+
                "    ,BED_NO                                STRING     "+
                "    ,BED_NAME                              STRING     "+
                "    ,ROOM_NO                               STRING     "+
                "    ,ROOM_NAME                             STRING     "+
                "    ,WARD_ID                               STRING     "+
                "    ,WARD_NAME                             STRING     "+
                "    ,DEPT_ID                               STRING     "+
                "    ,DEPT_NAME                             STRING     "+
                "    ,CHIEF_COMPLAINT                       STRING     "+
                "    ,ADMIT_CONDITION                       STRING     "+
                "    ,PRESENT_CONDITION                     STRING     "+
                "    ,CHINA_MED_OBSERVE_RESULT              STRING     "+
                "    ,ADMIT_DIAG_ICD_CODE                   STRING     "+
                "    ,ADMIT_DIAG_ICD_NAME                   STRING     "+
                "    ,ADMIT_CHINA_DISEASE_CODE              STRING     "+
                "    ,ADMIT_CHINA_DISEASE_NAME              STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_CODE             STRING     "+
                "    ,ADMIT_CHINA_SYNDROME_NAME             STRING     "+
                "    ,NEXT_TREAT_SCHEME                     STRING     "+
                "    ,THERAPEUTIC_PRINCIPLE                 STRING     "+
                "    ,ORDERS_CONTENT                        STRING     "+
                "    ,CHINA_DECOCTING_METHOD                STRING     "+
                "    ,CHINA_USED_METHOD                     STRING     "+
                "    ,PRESENT_DIAG_ICD_CODE                 STRING     "+
                "    ,PRESENT_DIAG_ICD_NAME                 STRING     "+
                "    ,PRESENT_CHINA_DISEASE_CODE            STRING     "+
                "    ,PRESENT_CHINA_DISEASE_NAME            STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_CODE           STRING     "+
                "    ,PRESENT_CHINA_SYNDROME_NAME           STRING     " +
                "    ,VISIT_ID                              STRING     "+
                "    ,PRIMARY KEY (RECORD_ID) NOT ENFORCED             "+
                ") "+ FlinkSinkUtil.getUpsertKafkaWith(Constant.TOPIC_DWD_MHS_IPT_STAGE_SUMMARY));

        // load data
        tableEnv.executeSql("insert into target select * from join_result");
    }

    private void joinTables(StreamTableEnvironment tableEnv) {
        // 1 设置数据的TTL（暂时10min）
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(30 * 60));
        // 2 join操作
        Table joinResult = tableEnv.sqlQuery("select    a.unique_id    AS    RECORD_ID                                 "+
                "    ,a.hospital_name                 AS    HOSPITAL_NAME                                                               "+
                "    ,a.hospital_code                 AS    HOSPITAL_CODE                                                               "+
                "    ,a.age_unit                      AS    AGE_UNIT                                                                    "+
                "    ,a.age                           AS    AGE                                                                         "+
                "    ,a.sex                           AS    SEX_NAME                                                                    "+
                "    ,a.sex_code                      AS    SEX_CODE                                                                    "+
                "    ,a.identify_no                   AS    ID_CARD_NUMBER                                                              "+
                "    ,a.patient_typecode              AS    PATIENT_VISIT_CATEGORY_CODE                                                 "+
                "    ,a.patient_type                  AS    PATIENT_VISIT_CATEGORY                                                      "+
                "    ,a.visit_id                      AS    INPATIENT_VISIT_FLOW_ID                                                     "+
                "    ,a.inpatient                     AS    INPATIENT_NO                                                                "+
                "    ,a.patient_id                    AS    PATIENT_ID                                                                  "+
                "    ,a.patient_name                  AS    PATIENT_NAME                                                                "+
                "    ,a.domain_id                     AS    DOMAIN_ID                                                                   "+
                "    ,a.visit_date                    AS    VISIT_DATE                                                                  "+
                "    ,a.visit_times                   AS    VISIT_TIMES                                                                 "+
                "    ,a.birth                         AS    BIRTH_DATE                                                                  "+
                "    ,a.author_code                   AS    AUTHOR_CODE                                                                 "+
                "    ,a.author_name                   AS    AUTHOR_NAME                                                                 "+
                "    ,a.custodian_code                AS    CUSTODIAN_CODE                                                              "+
                "    ,a.custodian_name                AS    CUSTODIAN_NAME                                                              "+
                "    ,a.doctor_signdate               AS    ELEC_SIGN_DATETIME                                                          "+
                "    ,a.doctor_code                   AS    DOCTOR_ID                                                                   "+
                "    ,a.doctor_name                   AS    DOCTOR_SIGN_NAME                                                            "+
                "    ,a.summary_date                  AS    BRIEF_SUMMARY_DATETIME                                                      "+
                "    ,a.admission_date                AS    ADMIT_DATETIME                                                              "+
                "    ,a.bed_no                        AS    BED_NO                                                                      "+
                "    ,a.bed_name                      AS    BED_NAME                                                                    "+
                "    ,a.ward_id                       AS    ROOM_NO                                                                     "+
                "    ,a.ward_name                     AS    ROOM_NAME                                                                   "+
                "    ,a.wards_id                      AS    WARD_ID                                                                     "+
                "    ,a.wards_name                    AS    WARD_NAME                                                                   "+
                "    ,a.dept_code                     AS    DEPT_ID                                                                     "+
                "    ,a.dept_name                     AS    DEPT_NAME                                                                   "+
                "    ,a.chief_complaint               AS    CHIEF_COMPLAINT                                                             "+
                "    ,a.admission_status              AS    ADMIT_CONDITION                                                             "+
                "    ,a.current_situation             AS    PRESENT_CONDITION                                                           "+
                "    ,a.tcm_four_diagnosis            AS    CHINA_MED_OBSERVE_RESULT                                                    "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_code else '' end        AS       ADMIT_DIAG_ICD_CODE           "+
                "    ,case when b.diag_type='入院诊断-西医诊断' then b.diag_name else '' end        AS       ADMIT_DIAG_ICD_NAME           "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_code else '' end        AS       ADMIT_CHINA_DISEASE_CODE      "+
                "    ,case when b.diag_type='入院诊断-中医病名' then b.diag_name else '' end        AS       ADMIT_CHINA_DISEASE_NAME      "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_code else '' end        AS       ADMIT_CHINA_SYNDROME_CODE     "+
                "    ,case when b.diag_type='入院诊断-中医证候' then b.diag_name else '' end        AS       ADMIT_CHINA_SYNDROME_NAME     "+
                "    ,a.treatment_plan                AS    NEXT_TREAT_SCHEME                                                           "+
                "    ,a.principle_and_method          AS    THERAPEUTIC_PRINCIPLE                                                       "+
                "    ,a.order_content                 AS    ORDERS_CONTENT                                                              "+
                "    ,a.decoct_method                 AS    CHINA_DECOCTING_METHOD                                                      "+
                "    ,a.tcm_drug_use                  AS    CHINA_USED_METHOD                                                           "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_code else '' end        AS       PRESENT_DIAG_ICD_CODE         "+
                "    ,case when b.diag_type='目前诊断-西医诊断' then b.diag_name else '' end        AS       PRESENT_DIAG_ICD_NAME         "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_code else '' end        AS       PRESENT_CHINA_DISEASE_CODE    "+
                "    ,case when b.diag_type='目前诊断-中医病名' then b.diag_name else '' end        AS       PRESENT_CHINA_DISEASE_NAME    "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_code else '' end        AS       PRESENT_CHINA_SYNDROME_CODE   "+
                "    ,case when b.diag_type='目前诊断-中医证候' then b.diag_name else '' end        AS       PRESENT_CHINA_SYNDROME_NAME   " +
                "    ,visit_id AS VISIT_ID "+
                " from hdsd00_14_01 a                                                                                                   "+
                " left join tdy_list_diag b                                                                                             "+
                " on a.unique_id = b.unique_id and a.xds_type = b.xds_type                                                            ");
        tableEnv.createTemporaryView("join_result",joinResult);

    }

    private void createSourceTables(StreamTableEnvironment tableEnv) {
        // create table : hdsd00_14_01
        tableEnv.executeSql("create table hdsd00_14_01 (       " +
                "pk                       string," +
                "upload_time              string," +
                "status                   string," +
                "empi                     string," +
                "encounter_id             string," +
                "visit_date               string," +
                "visit_domain             string," +
                "visit_id                 string," +
                "visit_times              string," +
                "patient_id               string," +
                "patient_domain           string," +
                "unique_id                string," +
                "xds_type                 string," +
                "xds_name                 string," +
                "effective_time           string," +
                "xds_version              string," +
                "domain_id                string," +
                "patient_type             string," +
                "patient_typecode         string," +
                "inpatient                string," +
                "identify_no              string," +
                "patient_name             string," +
                "sex_code                 string," +
                "sex                      string," +
                "birth                    string," +
                "age                      string," +
                "age_unit                 string," +
                "write_time               string," +
                "author_code              string," +
                "author_name              string," +
                "custodian_code           string," +
                "custodian_name           string," +
                "doctor_signdate          string," +
                "doctor_code              string," +
                "doctor_name              string," +
                "summary_date             string," +
                "admission_date           string," +
                "bed_no                   string," +
                "bed_name                 string," +
                "ward_id                  string," +
                "ward_name                string," +
                "wards_id                 string," +
                "wards_name               string," +
                "dept_code                string," +
                "dept_name                string," +
                "hospital_code            string," +
                "hospital_name            string," +
                "chief_complaint          string," +
                "admission_status         string," +
                "current_situation        string," +
                "tcm_four_diagnosis       string," +
                "treatment_plan           string," +
                "principle_and_method     string," +
                "order_content            string," +
                "decoct_method            string," +
                "tcm_drug_use             string," +
                "treatment_process        string" +
                ")" + FlinkSourceUtil.getKafkaWith("hdsd00_14_01", "App15_DwdMhsIptStageSummary"));

        // create table : tdy_list_diag
        super.readTdyListDiag(tableEnv, "App15_DwdMhsIptStageSummary");
    }
}
